# SPDX-FileCopyrightText: Copyright (c) 2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
from typing import List

from nemoguardrails.embeddings.embedding_providers import (
    EmbeddingModel,
    embeddings_executor,
)


class FastEmbedEmbeddingModel(EmbeddingModel):
    """Embedding model using FastEmbed.

    This class represents an embedding model that utilizes the FastEmbed library
    for generating sentence embeddings.

    Args:
        embedding_model (str): The name or path of the pre-trained model.

    Attributes:
        model: The model used for encoding sentences.
        embedding_size: The dimensionality of the sentence embeddings generated by the model.
    """

    def __init__(self, embedding_model: str):
        from fastembed import TextEmbedding as Embedding

        # Enabling a short form model name for all-MiniLM-L6-v2.
        if embedding_model == "all-MiniLM-L6-v2":
            embedding_model = "sentence-transformers/all-MiniLM-L6-v2"
        self.embedding_model = embedding_model

        try:
            self.model = Embedding(embedding_model)
        except ValueError as ex:
            # Sometimes the cached model in the temporary folder gets removed,
            # but the folder still exists, which causes an error. In this case,
            # we fall back to an explicit cache directory.
            if "Could not find model.onnx in" in str(ex):
                self.model = Embedding(embedding_model, cache_dir=".cache")
            else:
                raise ex

        # Get the embedding dimension of the model
        self.embedding_size = len(list(self.model.embed("test"))[0].tolist())

    async def encode_async(self, documents: List[str]) -> List[List[float]]:
        """Encode a list of documents into their corresponding sentence embeddings.

        Args:
            documents (List[str]): The list of documents to be encoded.

        Returns:
            List[List[float]]: The list of sentence embeddings, where each embedding is a list of floats.
        """

        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(
            embeddings_executor, self.model.embed, documents
        )

        return [x.tolist() for x in result]

    def encode(self, documents: List[str]) -> List[List[float]]:
        """Encode a list of documents into their corresponding sentence embeddings.

        Args:
            documents (List[str]): The list of documents to be encoded.

        Returns:
            List[List[float]]: The list of sentence embeddings, where each embedding is a list of floats.
        """
        return [x.tolist() for x in self.model.embed(documents)]
